/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime.io.disk;

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.PairGenerator;
import org.apache.flink.runtime.operators.testutils.PairGenerator.KeyMode;
import org.apache.flink.runtime.operators.testutils.PairGenerator.Pair;
import org.apache.flink.runtime.operators.testutils.PairGenerator.ValueMode;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.EOFException;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class FileChannelStreamsITCase {

    private static final long SEED = 649180756312423613L;

    private static final int KEY_MAX = Integer.MAX_VALUE;

    private static final int VALUE_SHORT_LENGTH = 114;

    private static final int VALUE_LONG_LENGTH = 112 * 1024;

    private static final int NUM_PAIRS_SHORT = 1000000;

    private static final int NUM_PAIRS_LONG = 3000;

    private static final int MEMORY_PAGE_SIZE = 32 * 1024;

    private static final int NUM_MEMORY_SEGMENTS = 3;

    private IOManager ioManager;

    private MemoryManager memManager;

    // --------------------------------------------------------------------------------------------

    @BeforeEach
    void beforeTest() {
        memManager =
                MemoryManagerBuilder.newBuilder()
                        .setMemorySize(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE)
                        .setPageSize(MEMORY_PAGE_SIZE)
                        .build();
        ioManager = new IOManagerAsync();
    }

    @AfterEach
    void afterTest() throws Exception {
        ioManager.close();
        assertThat(memManager.verifyEmpty())
                .withFailMessage("The memory has not been properly released")
                .isTrue();
    }

    // --------------------------------------------------------------------------------------------

    @Test
    void testWriteReadSmallRecords() throws Exception {
        List<MemorySegment> memory =
                memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);

        final PairGenerator generator =
                new PairGenerator(
                        SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
        final FileIOChannel.ID channel = ioManager.createChannel();

        // create the writer output view
        final BlockChannelWriter<MemorySegment> writer =
                ioManager.createBlockChannelWriter(channel);
        final FileChannelOutputView outView =
                new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);

        // write a number of pairs
        Pair pair = new Pair();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            generator.next(pair);
            pair.write(outView);
        }
        outView.close();

        // create the reader input view
        List<MemorySegment> readMemory =
                memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);

        final BlockChannelReader<MemorySegment> reader =
                ioManager.createBlockChannelReader(channel);
        final FileChannelInputView inView =
                new FileChannelInputView(
                        reader, memManager, readMemory, outView.getBytesInLatestSegment());
        generator.reset();

        // read and re-generate all records and compare them
        Pair readPair = new Pair();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            generator.next(pair);
            readPair.read(inView);
            assertThat(readPair)
                    .withFailMessage("The re-generated and the read record do not match.")
                    .isEqualTo(pair);
        }

        inView.close();
        reader.deleteChannel();
    }

    @Test
    void testWriteAndReadLongRecords() throws Exception {
        final List<MemorySegment> memory =
                memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);

        final PairGenerator generator =
                new PairGenerator(
                        SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
        final FileIOChannel.ID channel = this.ioManager.createChannel();

        // create the writer output view
        final BlockChannelWriter<MemorySegment> writer =
                this.ioManager.createBlockChannelWriter(channel);
        final FileChannelOutputView outView =
                new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);

        // write a number of pairs
        Pair pair = new Pair();
        for (int i = 0; i < NUM_PAIRS_LONG; i++) {
            generator.next(pair);
            pair.write(outView);
        }
        outView.close();

        // create the reader input view
        List<MemorySegment> readMemory =
                memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);

        final BlockChannelReader<MemorySegment> reader =
                ioManager.createBlockChannelReader(channel);
        final FileChannelInputView inView =
                new FileChannelInputView(
                        reader, memManager, readMemory, outView.getBytesInLatestSegment());
        generator.reset();

        // read and re-generate all records and compare them
        Pair readPair = new Pair();
        for (int i = 0; i < NUM_PAIRS_LONG; i++) {
            generator.next(pair);
            readPair.read(inView);
            assertThat(readPair)
                    .withFailMessage("The re-generated and the read record do not match.")
                    .isEqualTo(pair);
        }

        inView.close();
        reader.deleteChannel();
    }

    @Test
    void testReadTooMany() throws Exception {
        final List<MemorySegment> memory =
                memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);

        final PairGenerator generator =
                new PairGenerator(
                        SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
        final FileIOChannel.ID channel = this.ioManager.createChannel();

        // create the writer output view
        final BlockChannelWriter<MemorySegment> writer =
                this.ioManager.createBlockChannelWriter(channel);
        final FileChannelOutputView outView =
                new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);

        // write a number of pairs
        Pair pair = new Pair();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            generator.next(pair);
            pair.write(outView);
        }
        outView.close();

        // create the reader input view
        List<MemorySegment> readMemory =
                memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);

        final BlockChannelReader<MemorySegment> reader =
                ioManager.createBlockChannelReader(channel);
        final FileChannelInputView inView =
                new FileChannelInputView(
                        reader, memManager, readMemory, outView.getBytesInLatestSegment());
        generator.reset();

        // read and re-generate all records and compare them
        Pair readPair = new Pair();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            generator.next(pair);
            readPair.read(inView);
            assertThat(readPair)
                    .withFailMessage("The re-generated and the read record do not match.")
                    .isEqualTo(pair);
        }

        generator.next(pair);
        assertThatThrownBy(() -> readPair.read(inView))
                .withFailMessage("Read too much, expected EOFException.")
                .isInstanceOf(EOFException.class);

        inView.close();
        reader.deleteChannel();
    }

    @Test
    void testWriteReadOneBufferOnly() throws Exception {
        final List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), 1);

        final PairGenerator generator =
                new PairGenerator(
                        SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
        final FileIOChannel.ID channel = this.ioManager.createChannel();

        // create the writer output view
        final BlockChannelWriter<MemorySegment> writer =
                this.ioManager.createBlockChannelWriter(channel);
        final FileChannelOutputView outView =
                new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);

        // write a number of pairs
        Pair pair = new Pair();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            generator.next(pair);
            pair.write(outView);
        }
        outView.close();

        // create the reader input view
        List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), 1);

        final BlockChannelReader<MemorySegment> reader =
                ioManager.createBlockChannelReader(channel);
        final FileChannelInputView inView =
                new FileChannelInputView(
                        reader, memManager, readMemory, outView.getBytesInLatestSegment());
        generator.reset();

        // read and re-generate all records and compare them
        Pair readPair = new Pair();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            generator.next(pair);
            readPair.read(inView);
            assertThat(readPair)
                    .withFailMessage("The re-generated and the read record do not match.")
                    .isEqualTo(pair);
        }

        inView.close();
        reader.deleteChannel();
    }

    @Test
    void testWriteReadNotAll() throws Exception {
        final List<MemorySegment> memory =
                memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);

        final PairGenerator generator =
                new PairGenerator(
                        SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
        final FileIOChannel.ID channel = this.ioManager.createChannel();

        // create the writer output view
        final BlockChannelWriter<MemorySegment> writer =
                this.ioManager.createBlockChannelWriter(channel);
        final FileChannelOutputView outView =
                new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);

        // write a number of pairs
        Pair pair = new Pair();
        for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
            generator.next(pair);
            pair.write(outView);
        }
        outView.close();

        // create the reader input view
        List<MemorySegment> readMemory =
                memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);

        final BlockChannelReader<MemorySegment> reader =
                ioManager.createBlockChannelReader(channel);
        final FileChannelInputView inView =
                new FileChannelInputView(
                        reader, memManager, readMemory, outView.getBytesInLatestSegment());
        generator.reset();

        // read and re-generate all records and compare them
        Pair readPair = new Pair();
        for (int i = 0; i < NUM_PAIRS_SHORT / 2; i++) {
            generator.next(pair);
            readPair.read(inView);
            assertThat(readPair)
                    .withFailMessage("The re-generated and the read record do not match.")
                    .isEqualTo(pair);
        }

        inView.close();
        reader.deleteChannel();
    }
}
